package kafka_go

import (
	"fmt"
	sarama "gitee.com/tym_hmm/go-kafa-shopify-sarama"
	"strconv"
	"strings"
)

type ProductBalanceType int
type ProductAckType int

const (

	//默认最大连接数
	DEFAULT_MAX_CONNECTION int32 = 5

	//连接轮询
	PRODUCT_CONN_STRATEGY_BALANCE_ROUNDROBIN ProductBalanceType = 0

	//ack确认类型- 不等待 broker 的 ack，这一操作提供了一个最低的延迟，broker 一接收到还 没有写入磁盘就已经返回，当 broker 故障时有可能丢失数据
	PRODUCT_ACK_TYPE_NORNAL ProductAckType = 0

	//等待 broker 的 ack，partition 的 leader 落盘成功后返回 ack，如果在 follower 同步成功之前 leader 故障，那么将会丢失数据
	PRODUCT_ACK_TYPE_FOLLOWER ProductAckType = 1

	//等待 broker 的 ack，partition 的 leader 和 follower （ISRL里的follower，不是全部的follower）全部落盘成功后才 返回 ack。但是如果在 follower 同步完成后，broker 发送 ack 之前，leader 发生故障，那么会造成数据重复
	//开启事务后可避免数据重复
	PRODUCT_ACK_TYPE_ALL ProductAckType = -1
)

/**
生产者
*/
type BuildProduct struct {
	connStrategy    ProductBalanceType //连接策略 默认为轮询
	maxConnection   int32              //最大连接数
	isDebug         bool               //是否是调试
	isTransactional bool               //是否是事务提交
	ackType         ProductAckType     //ack确认类型
	name            string             //生产者名称
	addr            string             //生产者服务地址  多个以,分隔
	config          *sarama.Config     //其它配置
}

/**

 */
func NewBuildProduct(name string, addr string) BuildProductApi {
	return &BuildProduct{
		connStrategy:    PRODUCT_CONN_STRATEGY_BALANCE_ROUNDROBIN,
		maxConnection:   DEFAULT_MAX_CONNECTION,
		isDebug:         false,
		isTransactional: false,
		ackType:         PRODUCT_ACK_TYPE_NORNAL,
		name:            name,
		addr:            addr,
		config:          sarama.NewConfig(),
	}
}

func (this *BuildProduct) SetDebug(isDebug bool) BuildProductApi {
	this.isDebug = isDebug
	return this
}
func (this *BuildProduct) SetTransactional(isTransactional bool) BuildProductApi {
	this.isTransactional = isTransactional
	return this
}

func (this *BuildProduct) GetTransactional() bool {
	return this.isTransactional
}

func (this *BuildProduct) GetName() string {
	return strings.TrimSpace(this.name)
}
func (this *BuildProduct) GetAddr() string {
	return this.addr
}
func (this *BuildProduct) GetAddrSlice() []string {
	return strings.Split(this.addr, ",")
}
func (this *BuildProduct) GetMaxConnection() int32 {
	return this.maxConnection
}
func (this *BuildProduct) GetConnStrategy() ProductBalanceType {
	return this.connStrategy
}
func (this *BuildProduct) GetConfig() *sarama.Config {
	return this._getConfig()
}

func (this *BuildProduct) SetMaxConnection(maxConnection int32) BuildProductApi {
	this.maxConnection = maxConnection
	return this
}

func (this *BuildProduct) SetAckType(ackType ProductAckType) BuildProductApi {
	this.ackType = ackType
	return this
}

func (this *BuildProduct) SetConfig(config *sarama.Config) BuildProductApi {
	this.config = config
	return this
}

func (this *BuildProduct) GetHashCode() int64 {
	return hashCode(this.name)
}

func (this *BuildProduct) GetHashCodeString() string {
	hashCode := this.GetHashCode()
	return strconv.FormatInt(hashCode, 10)
}

func (this *BuildProduct) ToString() string {
	return fmt.Sprintf("name:%s, addr:%s,", this.name, this.addr)
}

func (this *BuildProduct) _getConfig() *sarama.Config {
	this.config.Producer.Return.Successes = true

	switch this.ackType {
	case PRODUCT_ACK_TYPE_NORNAL:
		this.config.Producer.RequiredAcks = sarama.NoResponse
	case PRODUCT_ACK_TYPE_FOLLOWER:
		this.config.Producer.RequiredAcks = sarama.WaitForLocal
	case PRODUCT_ACK_TYPE_ALL:
		this.config.Producer.RequiredAcks = sarama.WaitForAll
		this.config.Producer.Return.Successes = true
	default:
		this.config.Producer.RequiredAcks = sarama.NoResponse
	}
	if this.isTransactional {
		this.config.Producer.RequiredAcks = sarama.WaitForAll
		this.config.Producer.Return.Successes = true
	}
	return this.config
}
